1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.codec.http.codec.websocketcodec;
12 import kiss.logger;
13 import collie.codec.http.codec.httpcodec;
14 import collie.codec.http.httptansaction;
15 import std.bitmanip;
16 import collie.codec.http.codec.wsframe;
17 import collie.codec.http.httpmessage;
18 import collie.codec.http.errocode;
19 import std.conv;
20 
21 enum FRAME_SIZE_IN_BYTES = 512 * 512 * 2; //maximum size of a frame when sending a message
22 
23 
24 class WebsocketCodec : HTTPCodec
25 {
26 	enum ProcessingState
27 	{
28 		PS_READ_HEADER,
29 		PS_READ_Do_LENGTH,
30 		PS_READ_PAYLOAD_LENGTH_1,
31 		PS_READ_PAYLOAD_LENGTH,
32 		PS_READ_BIG_PAYLOAD_LENGTH,
33 		PS_READ_BIG_PAYLOAD_LENGTH_1,
34 		PS_READ_MASK,
35 		PS_READ_MASK_1,
36 		PS_READ_PAYLOAD
37 	}
38 
39 	this(TransportDirection direc, HTTPTransaction txn)
40 	{
41 		_transportDirection = direc;
42 		_transaction = txn;
43 	}
44 
45 	override CodecProtocol getProtocol() {
46 		return CodecProtocol.WEBSOCKET;
47 	}
48 
49 	override void onConnectClose()
50 	{
51 		if(_transaction){
52 			_transaction.onErro(HTTPErrorCode.REMOTE_CLOSED);
53 			_transaction.handler = null;
54 			_transaction.transport = null;
55 		}
56 	}
57 	
58 	override void onTimeOut()
59 	{
60 		if(_transaction){
61 			_transaction.onErro(HTTPErrorCode.TIME_OUT);
62 		}
63 	}
64 	
65 	override void detach(HTTPTransaction txn)
66 	{
67 		if(txn is _transaction)
68 			_transaction = null;
69 	}
70 
71 	
72 	override TransportDirection getTransportDirection()
73 	{
74 		return _transportDirection;
75 	}
76 	
77 	override StreamID createStream() {
78 		return 0;
79 	}
80 	
81 	override bool isBusy() {
82 		return !_finished;
83 	}
84 	
85 	override bool shouldClose()
86 	{
87 		return _shouldClose;
88 	}
89 	
90 	override void setParserPaused(bool paused){}
91 	
92 	override void setCallback(CallBack callback) {
93 		_callback = callback;
94 	}
95 	
96 	override size_t onIngress(ubyte[] buf)
97 	{
98 		readFrame(buf);
99 		return buf.length;
100 	}
101 	
102 	override size_t generateHeader(
103 		HTTPTransaction txn,
104 		HTTPMessage msg,
105 		HttpWriteBuffer buffer,
106 		bool eom = false)
107 	{
108 		return 0;
109 	}
110 	
111 	override size_t generateBody(HTTPTransaction txn,
112 		HttpWriteBuffer chain,in ubyte[] data,
113 		bool eom)
114 	{
115 		return 0;
116 	}
117 	
118 	override size_t generateChunkHeader(
119 		HTTPTransaction txn,
120 		HttpWriteBuffer buffer,
121 		size_t length)
122 	{
123 		return 0;
124 	}
125 	
126 	
127 	override size_t generateChunkTerminator(
128 		HTTPTransaction txn,
129 		HttpWriteBuffer buffer)
130 	{
131 		return 0;
132 	}
133 	
134 	override size_t generateEOM(HTTPTransaction txn,
135 		HttpWriteBuffer buffer)
136 	{
137 		return 0;
138 	}
139 
140 	override size_t  generateRstStream(HTTPTransaction txn,
141 		HttpWriteBuffer buffer,HTTPErrorCode code)
142 	{
143 		return 0;
144 	}
145 
146 	override size_t generateWsFrame(HTTPTransaction txn,
147 		HttpWriteBuffer buffer,OpCode code, ubyte[] data)
148 	{
149 		if((code & 0x08) == 0x08 && (data.length > 125))
150 				data = data[0 .. 125];
151 		if(code == OpCode.OpCodeClose)
152 			_shouldClose = true;
153 
154 		int numFrames = cast(int)(data.length / FRAME_SIZE_IN_BYTES);
155 		auto sizeLeft = data.length % FRAME_SIZE_IN_BYTES;
156 		if (numFrames == 0)
157 			numFrames = 1;
158 		size_t currentPosition = 0;
159 		size_t bytesLeft = data.length;
160 		size_t bytesWritten = 0;
161 		const OpCode firstOpCode = code;
162 		for (int i = 0; i < numFrames; ++i)
163 		{
164 			
165 			const bool isLastFrame = (i == (numFrames - 1));
166 			const bool isFirstFrame = (i == 0);
167 			
168 			const OpCode opcode = isFirstFrame ? firstOpCode : OpCode.OpCodeContinue;
169 			
170 			const size_t payloadLength = bytesLeft < FRAME_SIZE_IN_BYTES ? bytesLeft
171 				: FRAME_SIZE_IN_BYTES;
172             ubyte[] send = data[bytesWritten .. (bytesWritten + payloadLength)];
173             getFrameHeader(opcode, payloadLength, isLastFrame, buffer);
174             if (doMask())
175             {
176                 ubyte[4] mask = generateMaskingKey(); 
177                 buffer.write(mask[]);
178                 buffer.write(send);
179                 auto tdata = cast(ubyte[])buffer.sendData;
180                 for (size_t j = tdata.length - payloadLength; j < tdata.length; j++)
181                 {
182                     tdata[j] ^= mask[j % 4];
183                 }
184             }
185             else
186             {
187                 buffer.write(send);
188             }
189 			bytesLeft -= payloadLength;
190 			bytesWritten += payloadLength;
191 		}
192 		return buffer.length;
193 	}
194 
195 	ubyte[4] generateMaskingKey() // Client will used
196 	{
197         import std.datetime;
198         import std.bitmanip;
199         uint key = cast(uint)(Clock.currTime.toUnixTime!long());
200         return nativeToBigEndian(key);
201 	}
202 
203 protected:
204 	bool doMask(){return _transportDirection ==  TransportDirection.UPSTREAM;}
205 
206 	void getFrameHeader(OpCode code, size_t payloadLength, bool lastFrame, HttpWriteBuffer buffer)
207 	{
208 		ubyte[2] wdata = [0, 0];
209 		wdata[0] = cast(ubyte)((code & 0x0F) | (lastFrame ? 0x80 : 0x00));
210 		if(doMask())
211 			wdata[1] = 0x80;
212 		if (payloadLength <= 125){
213 			wdata[1] |= to!ubyte(payloadLength);
214 			buffer.write(wdata[]);
215 		} else if (payloadLength <= ushort.max) {
216 			wdata[1] |= 126;
217 			buffer.write(wdata[]);
218 			ubyte[2] length = nativeToBigEndian(to!ushort(payloadLength));
219 			buffer.write(length[]);
220 		} else {
221 			wdata[1] |= 127;
222 			buffer.write(wdata[]);
223 			auto length = nativeToBigEndian(payloadLength);
224 			buffer.write(length[]);
225 		}
226 	}
227 
228 	bool checkValidity()
229 	{
230 		void setError(CloseCode code, string closeReason)
231 		{
232 			frame._closeCode = code;
233 			frame._closeReason = closeReason;
234 			frame._isValid = false;
235 		}
236 		
237 		if (frame._rsv1 || frame._rsv2 || frame._rsv3)
238 		{
239 			setError(CloseCode.CloseCodeProtocolError, "Rsv field is non-zero");
240 		}
241 		else if (isOpCodeReserved(frame._opCode))
242 		{
243 			setError(CloseCode.CloseCodeProtocolError, "Used reserved opcode");
244 		}
245 		else if (frame.isControlFrame())
246 		{
247 			if (_length > 125)
248 			{
249 				setError(CloseCode.CloseCodeProtocolError, "Control frame is larger than 125 bytes");
250 			}
251 			else if (!frame._isFinalFrame)
252 			{
253 				setError(CloseCode.CloseCodeProtocolError, "Control frames cannot be fragmented");
254 			}
255 			else
256 			{
257 				frame._isValid = true;
258 			}
259 		}
260 		else
261 		{
262 			frame._isValid = true;
263 		}
264 		return frame._isValid;
265 	}
266 
267 	bool isOpCodeReserved(OpCode code)
268 	{
269 		return ((code > OpCode.OpCodeBinary) && (code < OpCode.OpCodeClose))
270 			|| (code > OpCode.OpCodePong);
271 	}
272 
273 	pragma(inline)
274 	void clear()
275 	{
276 		_state = ProcessingState.PS_READ_HEADER;
277 		_mask[] = 0;
278 		_hasMask = false;
279 		_buffer[] = 0;
280 		_readLen = 0;
281 		frame = WSFrame();
282 	}
283 
284 	void readFrame(in ubyte[] data)
285 	{		
286 		void resultOne()
287 		{
288 			if (frame.isValid && frame.isDataFrame())
289 			{
290 				if (!frame.isContinuationFrame())
291 				{
292 					_lastcode = frame.opCode();
293 				} 
294 				frame._lastCode = _lastcode;
295 				if (_hasMask)
296 				{ //解析mask
297 					for (size_t i = 0; i < _length; ++i)
298 					{
299 						frame.data[i] = frame.data[i] ^ _mask[i % 4];
300 					}
301 				}
302 			}
303 			if(_callback)
304 				_callback.onWsFrame(_transaction,frame);
305 			clear();
306 		}
307 		
308 		const size_t len = data.length;
309 		for (size_t i = 0; i < len; ++i)
310 		{
311 			ubyte ch = data[i];
312 			final switch (_state)
313 			{
314 				case ProcessingState.PS_READ_HEADER:
315 					frame._isFinalFrame = (ch & 0x80) != 0;
316 					frame._rsv1 = ((ch & 0x40) != 0);
317 					frame._rsv2 = ((ch & 0x20) != 0);
318 					frame._rsv3 = ((ch & 0x10) != 0);
319 					frame._opCode = cast(OpCode)(ch & 0x0F);
320 					_state = ProcessingState.PS_READ_Do_LENGTH;
321 					break;
322 				case ProcessingState.PS_READ_Do_LENGTH:
323 				{
324 					_hasMask = (ch & 0x80) != 0;
325 					auto tlen = (ch & 0x7F);
326 					switch (tlen)
327 					{
328 						case 126:
329 						{
330 							_state = ProcessingState.PS_READ_PAYLOAD_LENGTH;
331 							break;
332 						}
333 						case 127:
334 						{
335 							_state = ProcessingState.PS_READ_BIG_PAYLOAD_LENGTH;
336 							break;
337 						}
338 						default:
339 						{
340 							_length = tlen;
341 							frame.data = new ubyte[_length];
342 							_state = _hasMask ? ProcessingState.PS_READ_MASK
343 								: ProcessingState.PS_READ_PAYLOAD;
344 							break;
345 						}
346 					}
347 					if (!checkValidity())
348 					{
349 						_state = ProcessingState.PS_READ_HEADER;
350 						resultOne();
351 					}
352 				}
353 					break;
354 				case ProcessingState.PS_READ_PAYLOAD_LENGTH:
355 				{
356 					if (len - i >= 2)
357 					{
358 						ubyte[2] tlen = data[i .. (i + 2)];
359 						++i;
360 						_length = bigEndianToNative!(ushort)(tlen);
361 						frame.data = new ubyte[_length];
362 						_state = _hasMask ? ProcessingState.PS_READ_MASK
363 							: ProcessingState.PS_READ_PAYLOAD;
364 					}
365 					else
366 					{
367 						_buffer[] = 0;
368 						_buffer[0] = ch;
369 						_state = ProcessingState.PS_READ_PAYLOAD_LENGTH_1;
370 					}
371 				}
372 					break;
373 				case ProcessingState.PS_READ_PAYLOAD_LENGTH_1:
374 				{
375 					_buffer[1] = ch;
376 					ubyte[2] tlen = _buffer[0 .. 2];
377 					_length = bigEndianToNative!ushort(tlen);
378 					frame.data = new ubyte[_length];
379 					_state = _hasMask ? ProcessingState.PS_READ_MASK
380 						: ProcessingState.PS_READ_PAYLOAD;
381 				}
382 					break;
383 				case ProcessingState.PS_READ_BIG_PAYLOAD_LENGTH:
384 					auto llen = len - i;
385 					if (llen >= 8)
386 					{
387 						ubyte[8] tlen = data[i .. (i + 8)];
388 						i += 7;
389 						_length = cast(size_t) bigEndianToNative!ulong(tlen);
390 						frame.data = new ubyte[_length];
391 						_state = _hasMask ? ProcessingState.PS_READ_MASK
392 							: ProcessingState.PS_READ_PAYLOAD;
393 						_readLen = 0;
394 					}
395 					else
396 					{
397 						_buffer[] = 0;
398 						_buffer[0 .. llen] = data[i .. $];
399 						_readLen = llen;
400 						i += llen;
401 						_state = ProcessingState.PS_READ_BIG_PAYLOAD_LENGTH_1;
402 					}
403 					break;
404 				case ProcessingState.PS_READ_BIG_PAYLOAD_LENGTH_1:
405 				{
406 					auto llen = len - i;
407 					auto rlen = 8 - _readLen;
408 					if (llen >= rlen)
409 					{
410 						_buffer[_readLen .. 8] = data[i .. (i + rlen)];
411 						i += rlen;
412 						--i;
413 						_length = cast(size_t) bigEndianToNative!ulong(_buffer);
414 						frame.data = new ubyte[_length];
415 						_state = _hasMask ? ProcessingState.PS_READ_MASK
416 							: ProcessingState.PS_READ_PAYLOAD;
417 						_readLen = 0;
418 					}
419 					else
420 					{
421 						_buffer[_readLen .. (_readLen + llen)] = data[i .. $];
422 						_readLen += llen;
423 						i += llen;
424 						_state = ProcessingState.PS_READ_BIG_PAYLOAD_LENGTH_1;
425 					}
426 				}
427 					break;
428 				case ProcessingState.PS_READ_MASK:
429 					auto llen = len - i;
430 					if (llen >= 4)
431 					{
432 						const ubyte[] tlen = data[i .. (i + 4)];
433 						i += 3;
434 						_mask[] = tlen[];
435 						_state = ProcessingState.PS_READ_PAYLOAD;
436 						_readLen = 0;
437 					}
438 					else
439 					{
440 						_mask[] = 0;
441 						_mask[0 .. llen] = data[i .. $];
442 						_readLen = llen;
443 						i += llen;
444 						_state = ProcessingState.PS_READ_MASK_1;
445 					}
446 					break;
447 				case ProcessingState.PS_READ_MASK_1:
448 				{
449 					auto llen = len - i;
450 					auto rlen = 4 - _readLen;
451 					if (llen >= rlen)
452 					{
453 						_mask[_readLen .. 4] = data[i .. (i + rlen)];
454 						i += rlen;
455 						--i;
456 						_state = ProcessingState.PS_READ_PAYLOAD;
457 						_readLen = 0;
458 					}
459 					else
460 					{
461 						_mask[_readLen .. (_readLen + llen)] = data[i .. $];
462 						_readLen += llen;
463 						i += llen;
464 						_state = ProcessingState.PS_READ_MASK_1;
465 					}
466 				}
467 					break;
468 				case ProcessingState.PS_READ_PAYLOAD:
469 				{
470 					if(_length>0) logDebugf("_length = %d / %d", _length, len);
471 					auto llen = len - i;
472 					auto rlen = _length - _readLen;
473 					if (llen >= rlen)
474 					{
475 						frame.data[_readLen .. (_readLen + rlen)] = data[i .. (i + rlen)];
476 						i += rlen;
477 						--i;
478 						_state = ProcessingState.PS_READ_HEADER;
479 						resultOne();
480 					}
481 					else
482 					{
483 						frame.data[_readLen .. (_readLen + llen)] = data[i .. $];
484 						_readLen += llen;
485 					}
486 				}
487 					break;
488 					
489 			}
490 		}
491 	}
492 
493 private:
494 	TransportDirection _transportDirection;
495 	bool _finished;
496 	bool _shouldClose = false;
497 	CallBack _callback;
498 	HTTPTransaction _transaction;
499 
500 	ProcessingState _state;
501 	OpCode _lastcode;
502 	WSFrame frame;
503 	bool _hasMask;
504 	ubyte[4] _mask;
505 	ubyte[8] _buffer;
506 	size_t _length;
507 	size_t _readLen;
508 }
509